package cn.mrcode.wxsdk.web.common.ticket.lifeCycle.distributed.strategy.master;

import cn.mrcode.wxsdk.core.context.ConfigContext;
import cn.mrcode.wxsdk.core.context.SdkContexts;
import cn.mrcode.wxsdk.core.dialogue.common.LogUtil;
import cn.mrcode.wxsdk.core.dialogue.common.PublicAccount;
import cn.mrcode.wxsdk.core.dialogue.common.accessToken.lifeCycle.distributed.strategy.master.LifeCycleLeaderSelectorListener;
import cn.mrcode.wxsdk.core.dialogue.common.exception.ReqException;
import cn.mrcode.wxsdk.core.dialogue.common.exception.WxException;
import cn.mrcode.wxsdk.core.dialogue.common.log.LogTemplateUtil;
import cn.mrcode.wxsdk.core.dialogue.common.util.DateUtil;
import cn.mrcode.wxsdk.core.zkhelper.curator.ClientHelper;
import cn.mrcode.wxsdk.core.zkhelper.curator.ZkPathHelper;
import cn.mrcode.wxsdk.web.common.ticket.TicketApi;
import cn.mrcode.wxsdk.web.protocol.base.JsApiTicketInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;

/**
 * 维护生命周期的分布式服务
 *
 * @author zhuqiang
 * @version V1.0
 * @date 2015/9/22 17:22
 */
public class TicketDistributedTaskMaster {
    private static Logger log = LoggerFactory.getLogger(TicketDistributedTaskMaster.class);
    private ConcurrentHashMap<String, JsApiTicketInfo> ticketMap;

    private int sessionTimeout;
    private String zkServiceList;
    private HashMap<String, PublicAccount> accountMap;
    private String id = "";
    public static final String masterPath = ConfigContext.masterPath_ticket;
    final String apiName = "分布式TicketDistributedTaskMaster";

    private CuratorFramework client;
    private ConcurrentHashMap<String, JsApiTicketInfo> taskMap = new ConcurrentHashMap<>();  //用于任务中 更新的 token 列表.全部打成数组上传 zk上

    public TicketDistributedTaskMaster(int sessionTimeout, String zkServiceList, HashMap<String, PublicAccount> accountMap, ConcurrentHashMap<String, JsApiTicketInfo> ticketMap) {
        this.sessionTimeout = sessionTimeout;
        this.zkServiceList = zkServiceList;
        this.accountMap = accountMap;
        this.ticketMap = ticketMap;
        this.id = SdkContexts.getConfigContext().getId();
    }

    public void init() throws Exception {
        client = ClientHelper.createClient(zkServiceList,sessionTimeout);
        log.info(LogTemplateUtil.svMsg(apiName, "%s 链接zookpeer服务器成功", id));

        ZkPathHelper.createIfExistsNot(client,masterPath,"".getBytes()); //创建维护节点
        new LeaderListener(client,masterPath,id);

        // 先从zk上获取一次数据
        String firstBody = new String(client.getData().forPath(masterPath));
        refresh(firstBody);
        final NodeCache nodeCache = new NodeCache(client, masterPath, false);
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                String data = new String(nodeCache.getCurrentData().getData());
                refresh(data);
            }
        });
    }

    private void refresh(String data) {
       // log.info(LogTemplateUtil.svMsg(apiName, "刷新数据前，预览任务数量：taskMap.size=%s;queue.size=%s,刷新的数据：%s", taskMap.size() + "", queue.size() + "",data));
        if (StringUtils.isNotBlank(data)) {
            taskMap.clear();
            List<JsApiTicketInfo> list = JSON.parseArray(data, JsApiTicketInfo.class);
            for (JsApiTicketInfo ati : list) {
                String appID = ati.getAppID();
                ticketMap.put(appID, ati);
                taskMap.put(appID, ati);
            }
            log.info(LogTemplateUtil.svMsg(apiName, "id=%s;将新数据已刷入本地缓存=%s", id, JSONObject.toJSONString(list)));
        }
        //log.info(LogTemplateUtil.svMsg(apiName, "刷新数据后，预览任务数量：taskMap.size=%s;queue.size=%s,刷新的数据：%s", taskMap.size() + "", queue.size() + "",data));
    }



    private class LeaderListener extends LifeCycleLeaderSelectorListener{
        private DelayQueue<JsApiTicketInfo> queue = new DelayQueue<>(); // 队列中的数据 和 taskMap一致。也需要事先准备好给task
        public LeaderListener(CuratorFramework client, String leaderPath, String id) {
            super(client, leaderPath, id);
        }

        @Override
        protected void refreshData(String data) {
            refresh(data);
        }

        @Override
        protected void readyTaskData() throws Exception {
            log.info(LogTemplateUtil.svMsg(apiName, "初始化task任务数据"));
            for (Map.Entry<String, PublicAccount> ent : accountMap.entrySet()) {
                String appId = ent.getKey();
                PublicAccount pa = ent.getValue();
                if (!taskMap.containsKey(appId)) { //如果不包含则需要 获取token
                    JsApiTicketInfo jsApiTicketInfo = getJsApiTicketInfo(appId, pa.getAppSecret());
                    if(jsApiTicketInfo == null){
                        log.error("获取ticket出错，该公众号本次不纳入维护列表：" + JSON.toJSONString(ent));
                    }else{
                        taskMap.put(appId, jsApiTicketInfo);
                    }
                }
            }
            // 把获取的数据 上传一份先
            client.setData().forPath(masterPath, JSONObject.toJSONString(new ArrayList<>(taskMap.values())).getBytes());
        }

        @Override
        protected void run() {
            try {
                while (true) {
                    queue.clear();
                    for (Map.Entry<String, JsApiTicketInfo> ent : taskMap.entrySet()) {
                        queue.add(ent.getValue());
                    }
                    log.info(LogTemplateUtil.svMsg(apiName, "更新任务列表数量预览：taskMap.size=%s;queue.size=%s", taskMap.size() + "", queue.size() + ""));
                    JsApiTicketInfo ati = queue.take();  //获取超时的 对象
                    log.info(LogTemplateUtil.svMsg(apiName, "id=%s;获取到超时JsApiTicketInfo，数据=%s；超时时间=%s",
                            id, JSONObject.toJSONString(ati), DateUtil.lFormat(ati.getTimeOut())));
                    String appID = ati.getAppID();
                    if (taskMap.containsKey(appID)) {
                        JsApiTicketInfo tempAti = getJsApiTicketInfo(appID,ati.getAppSecret());
                        if(tempAti == null){
                            log.error("获取ticket出错，该过期ticket本次忽略处理：" + JSON.toJSONString(ati));
                            taskMap.put(appID, ati);
                            queue.add(ati);
                            continue;
                        }
                        ati = tempAti;
                        taskMap.put(appID, ati);
                        queue.add(ati);
                        log.info(LogTemplateUtil.svMsg(apiName, "id=%s;appID=%s 已经更新；新的JsApiTicketInfo数据=%s；超时时间=%s",
                                id, appID, JSONObject.toJSONString(ati), DateUtil.lFormat(ati.getTimeOut())));
                        //写入 zk上
                        client.setData().forPath(masterPath, JSONObject.toJSONString(new ArrayList<JsApiTicketInfo>(taskMap.values())).getBytes());
                    } else {
                        log.error(LogTemplateUtil.svMsg(apiName, "%s,taskMap中不包含该appid(%s),忽略处理", id, appID));
                    }
                }
            } catch (InterruptedException e) {
                leaderClose();
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;任务被中断,本机器已经放弃了leader领导权，异常信息=%s", id, JSONObject.toJSONString(e)));
            } catch (Exception e) {
                leaderClose();
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;任务被中断,本机器已经放弃了leader领导权", id, JSONObject.toJSONString(e)));
            }
        }

        private JsApiTicketInfo getJsApiTicketInfo(String appid,
                                                   String secret){

            try {
                return TicketApi.getJsApiTicketInfoSetUpBy(appid, secret, id);
            } catch (ReqException e) {
                log.error(LogUtil.fromateLog(e));
            } catch (WxException e) {
                log.error(LogUtil.fromateLog(e));
            }
            return null;
        }
    }
}
